/**
 * FileName: StructuredStreaming
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/01/18 10:01
 * Description: 读取来自HDFS数据，简单处理后，并将数据存储到HDFS中
 */
package cn.com.bonc.app;

import cn.com.bonc.util.DataFilterAndOperatorUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;


import java.io.IOException;


public class StructuredStreamingHDFS2 {


	private static final String SOURCE_PATH="/test_szj/source_data";
	private static final String SAVE_PATH = "hdfs://192.168.70.21:9000/test_szj/parquet5";

	public static void main(String[] args) {

		SparkSession spark = SparkSession
				.builder()
				.appName("MyHDFASTestApp2")
				.getOrCreate();

		//读取来自HDFS数据
		Dataset<String> HDFSDataset = spark
				.readStream()
				.option("latestFirst","false")
				.option("fileNameOnly","false")
				.text(SOURCE_PATH)
				.selectExpr("CAST(value AS STRING)")
				.as(Encoders.STRING());


		//数据处理......
		Dataset<String> dataset = DataFilterAndOperatorUtil.getInstance().filter(HDFSDataset);

		//Dataset<String> dataset = HDFSDataset.map((MapFunction<String, String>) x -> null, Encoders.STRING());


		//输出到控制台
		StreamingQuery query = dataset
				//.select(col("phone"),col("userAgent"),col("URL"))
				.writeStream()
				.format("console")
				.outputMode(OutputMode.Append())
				.start();

		//保存到HDFS
//		StreamingQuery query = dataset
//				.writeStream()
//				.format("text")
//				.option("checkpointLocation", SAVE_PATH)
//				.option("path", SAVE_PATH)
//				.start();

		//保持程序等待
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}

}
